 1.Flink-Connector (Kafka)之源码理解
   
   Funtion：UDF---处理数据的逻辑
   RichFunction: open/close 管理函数的生命周期的方法 ...RunTimeContext函数的运行时上下文
   SourceFunction： 提供了自定义数据源的功能,run方法是获取数据的方法
   ParallelSourceFunction:
   创建一个新的流数据源消费者
   Flink Kafka Consumer是一个流数据源，它从Apache Kafka提取并行数据流。使用者可以在多个并行
实例中运行，每个实例将从一个或多个Kafka分区提取数据。
   Flink Kafka消费者参与检查点并保证没有数据丢失
   当出现故障时，计算过程只处理一次元素。
  (注:这些保证自然假设Kafka本身不会丢失任何数据。)
   请注意，Flink在内部快照偏移量，将其作为分布式检查点的一部分。提交到kafka上的offset只是为了
使外部的outside view of progress与Flink的view of progress同步。通过这种方式，监视和其他工作可
以了解Flink Kafka消费者在某个主题上消费了多少数据。
   FlinkKafkaConsumerBase:
   所有Flink Kafka Consumer数据源的基类。这个类实现了所有Kafka版本的公共行为
   回顾自定义数据源---
   open方法和run方法
   Flink-Kafka-Consumer:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class FromKafka {
  	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
      	Properties properties = new Properties();
      	properties.setProperty("bootstrap.servers", "linux122:9092");
      	FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>
("mytopic", new SimpleStringSchema(), properties);
      	//从最早开始消费
      	consumer.setStartFromEarliest();
      	DataStream<String> stream = env.addSource(consumer);
      	stream.print();
      	//stream.map();
      	env.execute();
 	}
}
   flink-kafka 是如何消费的？以及如何分区分配等
   open方法源码：
   (1).指定offset提交模式
   OffsetCommitMode:
   OffsetCommitMode:表示偏移量如何从外部提交回Kafka brokers/ Zookeeper的行为
   它的确切值是在运行时在使用者子任务中确定的。
/**
 * The offset commit mode represents the behaviour of how offsets are externally committed
 * back to Kafka brokers / Zookeeper.
 *
 * <p>The exact value of this is determined at runtime in the consumer subtasks.
 */
@Internal
public enum OffsetCommitMode {

	/** Completely disable offset committing. */
	DISABLED,

	/** Commit offsets back to Kafka only when checkpoints are completed. */
	ON_CHECKPOINTS,

	/** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
	KAFKA_PERIODIC;
}
       DISABLED：完全禁用offset提交。
       ON_CHECKPOINTS：只有当检查点完成时，才将偏移量提交回Kafka。
       KAFKA_PERIODIC：使用内部Kafka客户端的自动提交功能，定期将偏移量提交回Kafka。
public static OffsetCommitMode fromConfiguration(
			boolean enableAutoCommit,
			boolean enableCommitOnCheckpoint,
			boolean enableCheckpointing) {

		if (enableCheckpointing) {
			// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
			return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
		} else {
			// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
			return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
		}
	}
}   
   使用多个配置值确定偏移量提交模式
   如果启用了checkpoint，并且启用了checkpoint完成时提交offset，返回ON_CHECKPOINTS。
   如果未启用checkpoint，但是启用了自动提交，返回KAFKA_PERIODIC。
   其他情况都返回DISABLED。
   (2).接下来创建和启动分区发现工具
	protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(
			KafkaTopicsDescriptor topicsDescriptor,
			int indexOfThisSubtask,
			int numParallelSubtasks);   
   创建用于为此子任务查找新分区的分区发现程序。
   参数1：topicsDescriptor : 描述我们是为固定主题还是主题模式发现分区,也就是fixedTopics和
topicPattern的封装。其中fixedTopics明确指定了topic的名称，称为固定topic。topicPattern为匹配
topic名称的正则表达式，用于分区发现。
/**
 * A Kafka Topics Descriptor describes how the consumer subscribes to Kafka topics -
 * either a fixed list of topics, or a topic pattern.
 */
@Internal
public class KafkaTopicsDescriptor implements Serializable {
   参数2：indexOfThisSubtask ：此consumer子任务的索引。
   参数3：numParallelSubtasks : 并行consumer子任务的总数
   方法返回一个分区发现器的实例
   (3).打开分区发现程序，初始化所有需要的Kafka连接。
	/**
	 * Opens the partition discoverer, initializing all required Kafka connections.
	 *
	 * <p>NOTE: thread-safety is not guaranteed.
	 */
	public void open() throws Exception {
		closed = false;
		initializeConnections();
	}
   注意是线程不安全的
   初始化所有需要的Kafka链接源码：
	/** Establish the required connections in order to fetch topics and partitions metadata. */
	protected abstract void initializeConnections() throws Exception;
    
	KafkaPartitionDiscoverer:
	@Override
	protected void initializeConnections() {
		this.kafkaConsumer = new KafkaConsumer<>(kafkaProperties);
	}
   创建出KafkaConsumer对象。
   (4).
		subscribedPartitionsToStartOffsets = new HashMap<>();
   已订阅的分区列表，这里将它初始化
	private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
   用来保存将读取的一组主题分区，以及要开始读取的初始偏移量。
   (5).
   用户获取所有fixedTopics和匹配topicPattern的Topic包含的所有分区信息
	/**
	 * Execute a partition discovery attempt for this subtask.
	 * This method lets the partition discoverer update what partitions it has discovered so far.
	 *
	 * @return List of discovered new partitions that this subtask should subscribe to.
	 */
	public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
   (6).
   如果consumer从检查点恢复状态，restoredState用来保存要恢复的偏移量
   选择TreeMap数据类型，目的是有序
	/**
	 * The offsets to restore to, if the consumer restores state from a checkpoint.
	 *
	 * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
	 *
	 * <p>Using a sorted map as the ordering is important when using restored state
	 * to seed the partition discoverer.
	 */
	private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;   
   在initializeState实例化方法中填充：
	@Override
	public final void initializeState(FunctionInitializationContext context) throws Exception {

		OperatorStateStore stateStore = context.getOperatorStateStore();

		this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,
			createStateSerializer(getRuntimeContext().getExecutionConfig())));

		if (context.isRestored()) {
			restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

			// populate actual holder for restored state
			for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
				restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
			}

			LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState);
		} else {
			LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
		}
	}   
   回顾：context.isRestored的机制：
   当程序发生故障的时候值为true
public interface ManagedInitializationContext {

	/**
	 * Returns true, if state was restored from the snapshot of a previous execution. This returns always false for
	 * stateless tasks.
	 */
	boolean isRestored();

   if (restoredState != null) {
	// 从快照恢复逻辑...
   } else {
	// 直接启动逻辑...
   }	
   如果restoredState没有存储某一分区的状态， 需要重头消费该分区
		final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
		if (restoredState != null) {
			for (KafkaTopicPartition partition : allPartitions) {
				if (!restoredState.containsKey(partition)) {
					restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
				}
			}   
   过滤掉不归该subtask负责的partition分区
 			for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
				// seed the partition discoverer with the union state while filtering out
				// restored partitions that should not be subscribed by this subtask
				if (KafkaTopicPartitionAssigner.assign(
					restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
						== getRuntimeContext().getIndexOfThisSubtask()){
					subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
				}
			}  
   assign方法：
   返回应该分配给特定Kafka分区的目标子任务的索引
	public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
		int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

		// here, the assumption is that the id of Kafka partitions are always ascending
		// starting from 0, and therefore can be used directly as the offset clockwise from the start index
		return (startIndex + partition.getPartition()) % numParallelSubtasks;
	}   
   subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(),
restoredStateEntry.getValue());
   将restoredState中保存的一组topic的partition和要开始读取的起始偏移量保存到
subscribedPartitionsToStartOffsets
   其中restoredStateEntry.getKey为某个Topic的摸个partition,restoredStateEntry.getValue为该
partition的要开始读取的起始偏移量
   过滤掉topic名称不符合topicsDescriptor的topicPattern的分区
			if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
				subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
					if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
						LOG.warn(
							"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
							entry.getKey());
						return true;
					}
					return false;
				});
			}   
   (7).直接启动consumer
	/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
	private StartupMode startupMode = StartupMode.GROUP_OFFSETS;   
   该枚举类型有5个值：
       GROUP_OFFSETS：从保存在zookeeper或者是Kafka broker的对应消费者组提交的offset开始消
费，这个是默认的配置
       EARLIEST：尽可能从最早的offset开始消费
       LATEST：从最近的offset开始消费
       TIMESTAMP：从用户提供的timestamp处开始消费
       SPECIFIC_OFFSETS：从用户提供的offset处开始消费
   根据startup mode,获取从哪个地方开始消费。然后,partition discoverer就会拉取初始分区的数据
		} else {
			// use the partition discoverer to fetch the initial seed partitions,
			// and set their initial offsets depending on the startup mode.
			// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
			// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
			// when the partition is actually read.
			switch (startupMode) {
				case SPECIFIC_OFFSETS:
					if (specificStartupOffsets == null) {
						throw new IllegalStateException(
							"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
								", but no specific offsets were specified.");
					}

					for (KafkaTopicPartition seedPartition : allPartitions) {
						Long specificOffset = specificStartupOffsets.get(seedPartition);
						if (specificOffset != null) {
							// since the specified offsets represent the next record to read, we subtract
							// it by one so that the initial state of the consumer will be correct   
   如果startup模式为SPECIFIC_OFFSETS:
   异常情况:如果没有配置具体从哪个offset开始消费
   正常情况：获取每个分区指定的消费起始offset
   Long specificOffset = specificStartupOffsets.get(seedPartition);
	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
	private Map<KafkaTopicPartition, Long> specificStartupOffsets;   
					for (KafkaTopicPartition seedPartition : allPartitions) {
						Long specificOffset = specificStartupOffsets.get(seedPartition);
						if (specificOffset != null) { //如果分区配置了offset，从设置的offset开始消费
							// since the specified offsets represent the next record to read, we subtract
							// it by one so that the initial state of the consumer will be correct
							subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
						} else { //如果分区没有配置offset，从GROUP_OFFSET开始消费 
							// default to group offset behaviour if the user-provided specific offsets
							// do not contain a value for this partition
							subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
						}
					}
   Run方法：
   (1).判断保存分区和读取起始偏移量的集合是否为空：
		if (subscribedPartitionsToStartOffsets == null) {
			throw new Exception("The partitions were not set for the consumer");
		}   
   (2).记录Kafka offset成功提交和失败提交的数量
	/** Counter for successful Kafka offset commits. */
	private transient Counter successfulCommits;

	/** Counter for failed Kafka offset commits. */
	private transient Counter failedCommits;   
   (3).获取当前自任务的索引
		final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask(); 
	/**
	 * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
	 * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
	 *
	 * @return The index of the parallel subtask.
	 */
	int getIndexOfThisSubtask();		
   (4).注册一个提交时的回调函数，提交成功时，提交成功计数器加一；提交失败时，提交失败计数器加一
		this.offsetCommitCallback = new KafkaCommitCallback() {
			@Override
			public void onSuccess() {
				successfulCommits.inc();
			}

			@Override
			public void onException(Throwable cause) {
				LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
				failedCommits.inc();
			}
		};   
   (5).接下来判断subscribedPartitionsToStartOffsets集合是否为空。如果为空，标记数据源的状态为暂时空
闲。
		// mark the subtask as temporarily idle if there are no initial seed partitions;
		// once this subtask discovers some partitions and starts collecting records, the subtask's
		// status will automatically be triggered back to be active.
		if (subscribedPartitionsToStartOffsets.isEmpty()) {
			sourceContext.markAsTemporarilyIdle();
		}   
   (6).创建一个KafkaFetcher,借助KafkaConsumer API从Kafka的broker拉取数据
		// from this point forward:
		//   - 'snapshotState' will draw offsets from the fetcher,
		//     instead of being built from `subscribedPartitionsToStartOffsets`
		//   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
		//     Kafka through the fetcher, if configured to do so)
		this.kafkaFetcher = createFetcher(
				sourceContext,
				subscribedPartitionsToStartOffsets,
				watermarkStrategy,
				(StreamingRuntimeContext) getRuntimeContext(),
				offsetCommitMode,
				getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
				useMetrics);   
   (7).
   根据分区发现间隔时间，来确定是否启动分区定时发现任务
   如果没有配置分区定时发现时间间隔，则直接启动获取数据任务；否则，启动定期分区发现任务和数据
获取任务
		// depending on whether we were restored with the current state version (1.3),
		// remaining logic branches off into 2 paths:
		//  1) New state - partition discovery loop executed as separate thread, with this
		//                 thread running the main fetcher loop
		//  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
		if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
			kafkaFetcher.runFetchLoop();
		} else {
			runWithPartitionDiscovery();
		}   
   
   循环拉取数据源码：
	private void runWithPartitionDiscovery() throws Exception {
		final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
		createAndStartDiscoveryLoop(discoveryLoopErrorRef); //启动分区发现定时任务

		kafkaFetcher.runFetchLoop(); //启动从Kafka broker上拉取数据任务 

		// make sure that the partition discoverer is waked up so that
		// the discoveryLoopThread exits
		partitionDiscoverer.wakeup(); // 确保分区发现器在分区发现循环线程启动期间一直处于唤醒状态
		joinDiscoveryLoopThread(); // 等待发现分区线程执行完毕

		// rethrow any fetcher errors
		final Exception discoveryLoopError = discoveryLoopErrorRef.get();
		if (discoveryLoopError != null) {
			throw new RuntimeException(discoveryLoopError);
		}
	}   
   createAndStartDiscoveryLoop:启动分区发现任务的方法：
	private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
		discoveryLoopThread = new Thread(() -> {
			try {
				// --------------------- partition discovery loop ---------------------

				// throughout the loop, we always eagerly check if we are still running before
				// performing the next operation, so that we can escape the loop as soon as possible

				while (running) {
					if (LOG.isDebugEnabled()) {
						LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
					}   
   尝试发现新的分区：
					final List<KafkaTopicPartition> discoveredPartitions;
					try {
						discoveredPartitions = partitionDiscoverer.discoverPartitions();
					} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
						// the partition discoverer may have been closed or woken up before or during the discovery;
						// this would only happen if the consumer was canceled; simply escape the loop
						break;
					}   
   将发现的新分区添加到kafkaFetcher中
					// no need to add the discovered partitions if we were closed during the meantime
					if (running && !discoveredPartitions.isEmpty()) {
						kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
					}

					// do not waste any time sleeping if we're not running anymore
					if (running && discoveryIntervalMillis != 0) {
						try {
							Thread.sleep(discoveryIntervalMillis);
						} catch (InterruptedException iex) {
							// may be interrupted if the consumer was canceled midway; simply escape the loop
							break;
						}
					}
				}   
   启动分区发现定时任务
			} catch (Exception e) {
				discoveryLoopErrorRef.set(e);
			} finally {
				// calling cancel will also let the fetcher loop escape
				// (if not running, cancel() was already called)
				if (running) {
					cancel();
				}
			}
		}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());

		discoveryLoopThread.start();
	}   
   partitionDiscoverer.discoverPartitions()的调用，即发现分区的执行过程。
 	/**
	 * Execute a partition discovery attempt for this subtask.
	 * This method lets the partition discoverer update what partitions it has discovered so far.
	 *
	 * @return List of discovered new partitions that this subtask should subscribe to.
	 */
	public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
		if (!closed && !wakeup) { // 确保没有关闭数据源，也没有wakeup
			try {
				List<KafkaTopicPartition> newDiscoveredPartitions;

				// (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern
				if (topicsDescriptor.isFixedTopics()) { // 如果配置了FixedTopics，获取这些topic的分区
					newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
				} else { // 否则获取所有分区
					List<String> matchedTopics = getAllTopics();

					// retain topics that match the pattern
					Iterator<String> iter = matchedTopics.iterator();
					while (iter.hasNext()) {
						if (!topicsDescriptor.isMatchingTopic(iter.next())) {
							iter.remove(); //逐个删除不匹配的topic
						}
					}

					if (matchedTopics.size() != 0) { //如果有匹配的topic，获取他们的分区
						// get partitions only for matched topics
						newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
					} else { // 否则，将newDiscoveredPartitions设置为null
						newDiscoveredPartitions = null;
					}
				}

				// (2) eliminate partition that are old partitions or should not be subscribed by this subtask
				if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
					throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
				} else {
					Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
					KafkaTopicPartition nextPartition;
					while (iter.hasNext()) { //分区存入DiscoveredPartitions集合中
						nextPartition = iter.next(); // 返回值为分区是否归当前task消费
						if (!setAndCheckDiscoveredPartition(nextPartition)) {
							iter.remove();
						}
					}
				}  
   kafkaFetcher的runFetchLoop方法
   此方法为FlinkKafkaConsumer获取数据的主入口，通过一个循环来不断获取kafka broker的数据。
	@Override
	public void runFetchLoop() throws Exception {
		try {
			// kick off the actual Kafka consumer
			consumerThread.start(); // consumerThread线程启动，需观察consumerThread线程的run方法

			while (running) {
				// this blocks until we get the next records
				// it automatically re-throws exceptions encountered in the consumer thread
				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

				// get the records for each topic partition
				for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {

					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
						records.records(partition.getKafkaPartitionHandle());

					partitionConsumerRecordsHandler(partitionRecords, partition);   
   KafkaConsumerThread线程的run方法实例化handover
	@Override
	public void run() {
		// early exit check
		if (!running) {
			return;
		}

		// this is the means to talk to FlinkKafkaConsumer's main thread
		final Handover handover = this.handover; //实例化handover，作用是和FlinkKafkaConsumer的main方法保持会话   
   回到KafkaFecher类中的runFetchLoop方法
	@Override
	public void runFetchLoop() throws Exception {
		try {
			// kick off the actual Kafka consumer
			consumerThread.start(); //消费线程启动，定期将消费到的数据转交给handover

			while (running) {
				// this blocks until we get the next records
				// it automatically re-throws exceptions encountered in the consumer thread
				// 获取handover中的数据，在consumerThread线程没有将数据发给handover之前，这个方法会阻塞
				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

				// get the records for each topic partition                  // 所有订阅区
				for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {

					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
						records.records(partition.getKafkaPartitionHandle()); //获取属于该分区的record

					partitionConsumerRecordsHandler(partitionRecords, partition);
				}
			}
		}
		finally {
			// this signals the consumer thread that no more work is to be done
			consumerThread.shutdown();
		}   
   partitionConsumerRecordsHandler方法
	protected void partitionConsumerRecordsHandler(
			List<ConsumerRecord<byte[], byte[]>> partitionRecords,
			KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {

		for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
			// 反序列化record，并将数据交给kafkaCollector，以备将数据向下游发送
			deserializer.deserialize(record, kafkaCollector);

			// emit the actual records. this also updates offset state atomically and emits
			// watermarks
			emitRecordsWithTimestamps( // 发送数据，更新offset，生成timestamp和watermarkets
				kafkaCollector.getRecords(),
				partition,
				record.offset(),
				record.timestamp());
            //如果数据源已到末尾(收到流结束信号)，停止fetcher循环
			if (kafkaCollector.isEndOfStreamSignalled()) {
				// end of stream signaled
				running = false;
				break;
			}
		}
	}


	@Override
	public void open(Configuration configuration) throws Exception {
		// determine the offset commit mode
		// 指定offset的提交模式：  DISABLED、 ON_CHECKPOINTS 、KAFKA_PERIODIC
		this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
				getIsAutoCommitEnabled(),
				enableCommitOnCheckpoints,
				((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

		// create the partition discoverer
		// 创建一个分区发现器
		this.partitionDiscoverer = createPartitionDiscoverer(
				topicsDescriptor,
				getRuntimeContext().getIndexOfThisSubtask(),
				getRuntimeContext().getNumberOfParallelSubtasks());
		// 实例化出 consumer对象		
		this.partitionDiscoverer.open();
        
		// 已经订阅的分区列表
		subscribedPartitionsToStartOffsets = new HashMap<>();
		final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
		if (restoredState != null) {
			//restoredState: 快照 consumer是从快照中恢复的方式创建
			for (KafkaTopicPartition partition : allPartitions) {
				if (!restoredState.containsKey(partition)) {
					restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
				}
			}

			for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
				// seed the partition discoverer with the union state while filtering out
				// restored partitions that should not be subscribed by this subtask
				if (KafkaTopicPartitionAssigner.assign(
					restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
						== getRuntimeContext().getIndexOfThisSubtask()){
					subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
				}
			} else {
					subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(),
					restoredStateEntry.getValue());
			}
		}
		LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
				getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
		} else {
			// use the partition discoverer to fetch the initial seed partitions,
			// and set their initial offsets depending on the startup mode.
			// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
			// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
			// when the partition is actually read.
			switch (startupMode) {
				case SPECIFIC_OFFSETS:
					if (specificStartupOffsets == null) {
						throw new IllegalStateException(
							"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
								", but no specific offsets were specified.");
					}

					for (KafkaTopicPartition seedPartition : allPartitions) {
						Long specificOffset = specificStartupOffsets.get(seedPartition);
						if (specificOffset != null) {
							// since the specified offsets represent the next record to read, we subtract
							// it by one so that the initial state of the consumer will be correct
							subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
						} else {
							// default to group offset behaviour if the user-provided specific offsets
							// do not contain a value for this partition
							subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
						}
					}

					break;
				case TIMESTAMP:
					if (startupOffsetsTimestamp == null) {
						throw new IllegalStateException(
							"Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
								", but no startup timestamp was specified.");
					}

					for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
							: fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
						subscribedPartitionsToStartOffsets.put(
							partitionToOffset.getKey(),
							(partitionToOffset.getValue() == null)
									// if an offset cannot be retrieved for a partition with the given timestamp,
									// we default to using the latest offset for the partition
									? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
									// since the specified offsets represent the next record to read, we subtract
									// it by one so that the initial state of the consumer will be correct
									: partitionToOffset.getValue() - 1);
					}

					break;
				default:
					for (KafkaTopicPartition seedPartition : allPartitions) {
						subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
					}
			}

			if (!subscribedPartitionsToStartOffsets.isEmpty()) {
				switch (startupMode) {
					case EARLIEST:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							subscribedPartitionsToStartOffsets.keySet());
						break;
					case LATEST:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							subscribedPartitionsToStartOffsets.keySet());
						break;
					case TIMESTAMP:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							startupOffsetsTimestamp,
							subscribedPartitionsToStartOffsets.keySet());
						break;
					case SPECIFIC_OFFSETS:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							specificStartupOffsets,
							subscribedPartitionsToStartOffsets.keySet());

						List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
						for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
							if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
								partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
							}
						}

						if (partitionsDefaultedToGroupOffsets.size() > 0) {
							LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
									"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
								getRuntimeContext().getIndexOfThisSubtask(),
								partitionsDefaultedToGroupOffsets.size(),
								partitionsDefaultedToGroupOffsets);
						}
						break;
					case GROUP_OFFSETS:
						LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
							getRuntimeContext().getIndexOfThisSubtask(),
							subscribedPartitionsToStartOffsets.size(),
							subscribedPartitionsToStartOffsets.keySet());
				}
			} else {
				LOG.info("Consumer subtask {} initially has no partitions to read from.",
					getRuntimeContext().getIndexOfThisSubtask());
			}

			this.deserializer.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
		}
	}

			
   该方法包含的内容为 FlinkKafkaConsumer 的初始化逻辑。
   首先设置提交offset的模式。
   接下来创建和启动分区发现工具。
   subscribedPartitionsToStartOffsets 为已订阅的分区列表，这里将它初始化。
   run:
   kafka-console-producer.sh --broker-list teacher2:9092 --topic mytopic
 
 2.消费策略
   
   setStartFromGroupOffsets()【默认消费策略】
   默认读取上次保存的offset信息
   如果是应用第一次启动，读取不到上次的offset信息，则会根据这个参数auto.offset.reset的值来
进行消费数据
   setStartFromEarliest()
   从最早的数据开始进行消费，忽略存储的offset信息
   setStartFromLatest()
   从最新的数据进行消费，忽略存储的offset信息
   setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)
   从指定位置进行消费
   当checkpoint机制开启的时候，KafkaConsumer会定期把kafka的offset信息还有其他operator的
状态信息一块保存起来。当job失败重启的时候，Flink会从最近一次的checkpoint中进行恢复数
据，重新消费kafka中的数据。
   为了能够使用支持容错的kafka Consumer，需要开启checkpoint
   env.enableCheckpointing(5000); // 每5s checkpoint一次
   
 3.Kafka consumer offset自动提交：
   
   kafka consumer offset自动提交的配置需要根据job是否开启checkpoint来区分
   checkpoint关闭时：
   checkpoint开启时:
   如果启用了checkpoint，并且启用了checkpoint完成时提交offset，返回ON_CHECKPOINTS。
   如果未启用checkpoint，但是启用了自动提交，返回KAFKA_PERIODIC。
   其他情况都返回DISABLED。
   OffsetCommitMode 是一个枚举类型，具有如下三个值：
       DISABLED：完全禁用offset提交。
       ON_CHECKPOINTS：当checkpoint完成的时候再提交offset。
       KAFKA_PERIODIC：周期性提交offset。
 
 4.Flink kafka Producer
   
   代码接受nc
   把接收到的nc的数据，给到kafka flink kafka producer
   代码：
package com.lagou.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class SinkToKafka {
 	public static void main(String[] args) throws Exception {
   		StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
   		DataStreamSource<String> data = env.socketTextStream("linux122", 7777);
   		Properties properties = new Properties();
   		properties.setProperty("bootstrap.servers","linux122:9092");
   		FlinkKafkaProducer producer = new FlinkKafkaProducer("linux122:9092",
"mytopic2", new SimpleStringSchema());
   		data.addSink(producer);
   		env.execute();
 	}
}
   
   